-
Couldn't load subscription status.
- Fork 381
Add all filles metadata tables #1626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- all_files - all_data_files - all_delete_files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I added a few comments
pyiceberg/table/inspect.py
Outdated
| all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map( | ||
| lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots] | ||
| ) | ||
| all_manifest_files = list( | ||
| {(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list} | ||
| ) | ||
| all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map( | ||
| lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files] | ||
| ) | ||
| all_files_list = [file for files in all_files_by_manifest for file in files] | ||
| return pa.Table.from_pylist( | ||
| all_files_list, | ||
| schema=self._get_files_schema(), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about something like this?
Also i would rename _files_by_manifest and have it return pa.Table, so we can skip the flatten and just concat the tables.
| all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map( | |
| lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots] | |
| ) | |
| all_manifest_files = list( | |
| {(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list} | |
| ) | |
| all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map( | |
| lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files] | |
| ) | |
| all_files_list = [file for files in all_files_by_manifest for file in files] | |
| return pa.Table.from_pylist( | |
| all_files_list, | |
| schema=self._get_files_schema(), | |
| ) | |
| manifest_lists = executor.map( | |
| lambda snapshot: snapshot.manifests(self.tbl.io), | |
| snapshots | |
| ) | |
| unique_manifests = { | |
| (manifest.manifest_path, manifest) | |
| for manifest_list in manifest_lists | |
| for manifest in manifest_list | |
| } | |
| file_lists = executor.map( | |
| self._files_by_manifest, | |
| [(manifest, data_file_filter) for _, manifest in unique_manifests] | |
| ) | |
| all_files = [ | |
| file | |
| for file_list in file_lists | |
| for file in file_list | |
| ] | |
| return pa.Table.from_pylist( | |
| all_files, | |
| schema=self._get_files_schema() | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this, the impl of the _files_by_manifest enforces uniqueness which wasn't clear
| self, manifest_list: ManifestFile, data_file_filter: Optional[Set[DataFileContent]] = None | ||
| ) -> List[Dict[str, Any]]: | ||
| files: list[dict[str, Any]] = [] | ||
| schema = self.tbl.metadata.schema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when time traveling with different snapshots, we shouldnt just use the current table schema
for context #1053 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevinjqliu updated code as per comments.
|
@soumya-ghosh Gentle ping, would you be interested in contributing this? Would be great to get this in 🚀 |
pyiceberg/table/inspect.py
Outdated
| return pa.Table.from_pylist( | ||
| files, | ||
| schema=files_schema, | ||
| [], | ||
| schema=self._get_files_schema(), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one, this can be further simplified to:
return self._get_files_schema().empty_table()Less is more :)
pyiceberg/table/inspect.py
Outdated
| def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": | ||
| import pyarrow as pa | ||
|
|
||
| files_table: list[pa.Table] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can move this one down, we don't need to create the error when we return on line 642
|
@soumya-ghosh I see that you incorporated the feedback by @kevinjqliu directly, instead of accepting the suggestion. That also works, thanks for working on this. I think we're pretty close 👍 |
|
Yes @Fokko, there is an open discussion that was happening in #1053 (comment). I will raise another PR for docs about the inspect operations. |
| return self._all_files({DataFileContent.DATA}) | ||
|
|
||
| def all_delete_files(self) -> "pa.Table": | ||
| return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a Spark table to test this:
iceberg-python/dev/provision.py
Lines 121 to 138 in 05f07ee
| for format_version in [2, 3]: | |
| identifier = f'{catalog_name}.default.test_positional_mor_deletes_v{format_version}' | |
| spark.sql( | |
| f""" | |
| CREATE OR REPLACE TABLE {identifier} ( | |
| dt date, | |
| number integer, | |
| letter string | |
| ) | |
| USING iceberg | |
| TBLPROPERTIES ( | |
| 'write.delete.mode'='merge-on-read', | |
| 'write.update.mode'='merge-on-read', | |
| 'write.merge.mode'='merge-on-read', | |
| 'format-version'='{format_version}' | |
| ); | |
| """ | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, will check this. If this requires changes, it will also need changes in files and delete_files table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko Added an integration test for table with format version 3, used Spark to write through pyiceberg to V3 table were failing.
Note that, the outputs of files metadata (and all other related tables) do not completely match with Spark counterparts due to additional columns in like first_row_id, referenced_data_file, content_offset, content_size_in_bytes. This needs to added first in DataFile class then propagated as required. Should be addressed in different issue, will it part of V3 tracking issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's do that in a separate PR: #1982
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left one minor comment for partition, apart from that, this looks great to me. Thanks @soumya-ghosh for working on this 🙌
| "content": data_file.content, | ||
| "file_path": data_file.file_path, | ||
| "file_format": data_file.file_format, | ||
| "spec_id": data_file.spec_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Spark we also have the partition column, I think it would be good to add that one here as well:
iceberg-python/pyiceberg/table/inspect.py
Lines 124 to 125 in 9fff025
| partition_record = self.tbl.metadata.specs_struct() | |
| pa_record_struct = schema_to_pyarrow(partition_record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko Added partition column in files metadata table schema and added a test for the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
order of spec_id and partition column fixed.
| return self._all_files({DataFileContent.DATA}) | ||
|
|
||
| def all_delete_files(self) -> "pa.Table": | ||
| return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's do that in a separate PR: #1982
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor remark, apart from that it looks good.
Pinging @geruh @kevinjqliu to see if they have any further comments
| "content": data_file.content, | ||
| "file_path": data_file.file_path, | ||
| "file_format": data_file.file_format, | ||
| "spec_id": data_file.spec_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Let's merge this to unblock #1958. Thanks @soumya-ghosh for working on this, and thanks @kevinjqliu and @geruh for the reviews 🙌 |
Implements below metadata table from - apache#1053 - `all_files` - `all_data_files` - `all_delete_files` Refactored code for files metadata for better reusability


Implements below metadata table from - #1053
all_filesall_data_filesall_delete_filesRefactored code for files metadata for better reusability